Welcome to my blog!

Migrate to Kubeflow Pipelines v2 – kfp sdk examples

KFP sdk v2 is the current focus of the Kubeflow community, which means it will receive ongoing updates, improvements, and support. Kfp v1 is legacy and there will be no further updates or support.

In this short blog post I will present some of the changes:

Environment variables

Instead:

ENV_VARIABLE = V1EnvVar(name="ENV_VARIABLE", value="test_value")
op = print_env_variable().add_env_variable(ENV_VARIABLE)    

Do:

op = print_env_variable().set_env_variable(name="ENV_VARIABLE", value="test_value")

Pod labels

Instead:

op = print_op().add_pod_label("label", "value")

Do:


from kfp import kubernetes
op = print_op()
kubernetes.add_pod_label(op, "some key", "some value")

Pod annotations

Instead:

op = print_op().set_pod_annotation("some key","some value")

Do:

from kfp import kubernetes
op = print_op()
kubernetes.add_pod_annotation(op,"some key","some value")

ParallelFor

Instead:

with dsl.ParallelFor(loop_args=ids, parallelism=0) as id:
    print_op(text=id)

Do:

with dsl.ParallelFor(items=ids, parallelism=0) as id:
    print_op(text=str(id))

In case of parallelFor inside parallelFor, the old way was:

    with dsl.ParallelFor(loop_args=asins, parallelism=0) as asin:
        with dsl.ParallelFor(loop_args=trials, parallelism=0) as trial:
            print_op(asin=asin, trial=trial)

Now you can use:

     with dsl.ParallelFor(items=asins, parallelism=0) as asin:
        for trial in trials:
            print_op(asin=str(asin), trial=str(trial))

dsl.condition is now dsl.If

Example:

from kfp import dsl
with dsl.If(condition=evaluate_task.output > 0.90, name="accuracy > 0.90"):

Make sure that all component calls have named parameters

Before:

op = print_op("some text")

Now:

op = print_op(text="some text")

Output of a component with return

If your component return value as python method:

@component()
def my_component() -> int:
    return 5

Then you use that output in another component:

my_component.output

Before you were able to write (now it will fail)

my_component.outputs["output"]

Attach secrets as environment variables

Before:

from kfp.onprem import use_k8s_secret
minio_comp().apply(
        use_k8s_secret(
            secret_name="mlpipeline-minio-artifact",
            k8s_secret_key_to_env={
                "accesskey": "ACCESS_KEY",
                "secretkey": "SECRET_KEY",
            },
        )
    )

Now:


from kfp import kubernetes
list_op = minio_comp()
kubernetes.use_secret_as_env(
    list_op,
    secret_name="mlpipeline-minio-artifact",
    secret_key_to_env={
            "accesskey": "ACCESS_KEY",
            "secretkey": "SECRET_KEY",
        },
)

Attach pvc volume to a component

Before:

train_op = train().add_pvolumes({"/mnt": dsl.PipelineVolume(pvc=volume_name)})
    

Now:

from kfp import kubernetes
# keep this "mnt" as it is
train_op = train()
kubernetes.mount_pvc(
        task=train_op, pvc_name=volume_name, mount_path="/mnt"
    )

Leave a Reply

Your email address will not be published. Required fields are marked *